今天,我們來學習 Rust 的並行程式設計特性。Rust 的設計目標之一就是提供安全且高效的並行程式設計支援,這使得 Rust 在處理複雜的多執行緒時特別有優勢。
Rust 標準函式庫提供了 std::thread 模組來支援執行緒操作。
use std::thread;
use std::time::Duration;
fn main() {
let handle = thread::spawn(|| {
for i in 1..10 {
println!("你好,這是來自衍生執行緒的第 {} 次問候!", i);
thread::sleep(Duration::from_millis(1));
}
});
for i in 1..5 {
println!("你好,這是來自主執行緒的第 {} 次問候!", i);
thread::sleep(Duration::from_millis(1));
}
handle.join().unwrap();
}
Rust 提供了 channel 來實現執行緒間的安全通訊。
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let val = String::from("你好");
tx.send(val).unwrap();
});
let received = rx.recv().unwrap();
println!("收到:{}", received);
}
雖然訊息傳遞是很好的並行模型,但有時我們需要在多個執行緒間共享資料。Rust 提供了多種同步原語。
Mutex 保證了在同一時間只有一個執行緒可以存取資料。
use std::sync::{Arc, Mutex};
use std::thread;
fn main() {
let counter = Arc::new(Mutex::new(0));
let mut handles = vec![];
for _ in 0..10 {
let counter = Arc::clone(&counter);
let handle = thread::spawn(move || {
let mut num = counter.lock().unwrap();
*num += 1;
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("結果:{}", *counter.lock().unwrap());
}
RwLock 允許多個讀取者或一個寫入者同時存取資料。
use std::sync::{Arc, RwLock};
use std::thread;
fn main() {
let data = Arc::new(RwLock::new(5));
// 建立多個讀取執行緒
let mut read_handles = vec![];
for _ in 0..3 {
let data = Arc::clone(&data);
read_handles.push(thread::spawn(move || {
let value = data.read().unwrap();
println!("讀取的值:{}", *value);
}));
}
// 建立一個寫入執行緒
let write_handle = {
let data = Arc::clone(&data);
thread::spawn(move || {
let mut value = data.write().unwrap();
*value += 1;
println!("將值增加到:{}", *value);
})
};
// 等待所有執行緒完成
for handle in read_handles {
handle.join().unwrap();
}
write_handle.join().unwrap();
println!("最終值:{}", *data.read().unwrap());
}
對於簡單的共享計數器,我們可以使用 atomic 類型,它們提供了無鎖且執行緒安全的操作。
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::thread;
fn main() {
let counter = Arc::new(AtomicUsize::new(0));
let mut handles = vec![];
for _ in 0..10 {
let counter = Arc::clone(&counter);
let handle = thread::spawn(move || {
counter.fetch_add(1, Ordering::SeqCst);
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("結果:{}", counter.load(Ordering::SeqCst));
}
Rust 的 async/await 特性允許我們編寫非阻塞的非同步程式碼。這需要使用 tokio 或其他非同步執行時期。
Cargo.toml
[dependencies]
rand = "0.8.5"
serde = { version = "1.0", features = ["derive"] }
thiserror = "1.0.63"
serde_json = "1.0.128"
tokio = "1.40.0"
reqwest = "0.12.7"
futures = "0.3.30"
use tokio;
#[tokio::main]
async fn main() {
let handle = tokio::spawn(async {
println!("來自 tokio 任務的問候!");
});
// 等待非同步任務完成
handle.await.unwrap();
}
讓我們建立一個簡單的並行網頁爬蟲,它可以同時爬取多個網頁:
use std::sync::Arc;
use tokio;
use reqwest;
use futures::stream::{self, StreamExt};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let client = Arc::new(reqwest::Client::new());
let urls = vec![
"https://www.rust-lang.org",
"https://github.com",
"https://www.wikipedia.org",
];
let fetches = stream::iter(urls)
.map(|url| {
let client = Arc::clone(&client);
tokio::spawn(async move {
let resp = client.get(url).send().await?;
println!("來自 {} 的回應:{} 位元組", url, resp.content_length().unwrap_or(0));
Ok::<_, reqwest::Error>(())
})
})
.buffer_unordered(3);
fetches.for_each(|result| async {
match result {
Ok(Ok(())) => {}
Ok(Err(e)) => eprintln!("發生錯誤:{}", e),
Err(e) => eprintln!("發生 JoinError:{}", e),
}
}).await;
Ok(())
}
這個例子使用了 tokio 執行和 reqwest 函式庫來並行地爬取多個網頁。
明天,我們來了解 Rust 的智慧指標和內部可變性,這些進階特性可以幫助我們更靈活地管理記憶體和共享資料。
channel 真的是好東西,Go 在這上面的應用很多
好 來學
Rust 也是有mpsc,我目前也還在學當中XD